# Copyright 2016 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests of grpc_health.v1.health."""

import threading
import time
import unittest

import grpc
from grpc_health.v1 import health
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc

from tests.unit import test_common
from tests.unit.framework.common import test_constants

from six.moves import queue

_SERVING_SERVICE = 'grpc.test.TestServiceServing'
_UNKNOWN_SERVICE = 'grpc.test.TestServiceUnknown'
_NOT_SERVING_SERVICE = 'grpc.test.TestServiceNotServing'
_WATCH_SERVICE = 'grpc.test.WatchService'


def _consume_responses(response_iterator, response_queue):
    for response in response_iterator:
        response_queue.put(response)


class HealthServicerTest(unittest.TestCase):

    def setUp(self):
        self._servicer = health.HealthServicer()
        self._servicer.set('', health_pb2.HealthCheckResponse.SERVING)
        self._servicer.set(_SERVING_SERVICE,
                           health_pb2.HealthCheckResponse.SERVING)
        self._servicer.set(_UNKNOWN_SERVICE,
                           health_pb2.HealthCheckResponse.UNKNOWN)
        self._servicer.set(_NOT_SERVING_SERVICE,
                           health_pb2.HealthCheckResponse.NOT_SERVING)
        self._server = test_common.test_server()
        port = self._server.add_insecure_port('[::]:0')
        health_pb2_grpc.add_HealthServicer_to_server(self._servicer,
                                                     self._server)
        self._server.start()

        self._channel = grpc.insecure_channel('localhost:%d' % port)
        self._stub = health_pb2_grpc.HealthStub(self._channel)

    def tearDown(self):
        self._server.stop(None)
        self._channel.close()

    def test_check_empty_service(self):
        request = health_pb2.HealthCheckRequest()
        resp = self._stub.Check(request)
        self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)

    def test_check_serving_service(self):
        request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE)
        resp = self._stub.Check(request)
        self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)

    def test_check_unknown_serivce(self):
        request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE)
        resp = self._stub.Check(request)
        self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status)

    def test_check_not_serving_service(self):
        request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE)
        resp = self._stub.Check(request)
        self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
                         resp.status)

    def test_check_not_found_service(self):
        request = health_pb2.HealthCheckRequest(service='not-found')
        with self.assertRaises(grpc.RpcError) as context:
            resp = self._stub.Check(request)

        self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code())

    def test_watch_empty_service(self):
        request = health_pb2.HealthCheckRequest(service='')
        response_queue = queue.Queue()
        rendezvous = self._stub.Watch(request)
        thread = threading.Thread(
            target=_consume_responses, args=(rendezvous, response_queue))
        thread.start()

        response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
        self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
                         response.status)

        rendezvous.cancel()
        thread.join()
        self.assertTrue(response_queue.empty())

    def test_watch_new_service(self):
        request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
        response_queue = queue.Queue()
        rendezvous = self._stub.Watch(request)
        thread = threading.Thread(
            target=_consume_responses, args=(rendezvous, response_queue))
        thread.start()

        response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
        self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
                         response.status)

        self._servicer.set(_WATCH_SERVICE,
                           health_pb2.HealthCheckResponse.SERVING)
        response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
        self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
                         response.status)

        self._servicer.set(_WATCH_SERVICE,
                           health_pb2.HealthCheckResponse.NOT_SERVING)
        response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
        self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
                         response.status)

        rendezvous.cancel()
        thread.join()
        self.assertTrue(response_queue.empty())

    def test_watch_service_isolation(self):
        request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
        response_queue = queue.Queue()
        rendezvous = self._stub.Watch(request)
        thread = threading.Thread(
            target=_consume_responses, args=(rendezvous, response_queue))
        thread.start()

        response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
        self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
                         response.status)

        self._servicer.set('some-other-service',
                           health_pb2.HealthCheckResponse.SERVING)
        with self.assertRaises(queue.Empty):
            response_queue.get(timeout=test_constants.SHORT_TIMEOUT)

        rendezvous.cancel()
        thread.join()
        self.assertTrue(response_queue.empty())

    def test_two_watchers(self):
        request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
        response_queue1 = queue.Queue()
        response_queue2 = queue.Queue()
        rendezvous1 = self._stub.Watch(request)
        rendezvous2 = self._stub.Watch(request)
        thread1 = threading.Thread(
            target=_consume_responses, args=(rendezvous1, response_queue1))
        thread2 = threading.Thread(
            target=_consume_responses, args=(rendezvous2, response_queue2))
        thread1.start()
        thread2.start()

        response1 = response_queue1.get(timeout=test_constants.SHORT_TIMEOUT)
        response2 = response_queue2.get(timeout=test_constants.SHORT_TIMEOUT)
        self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
                         response1.status)
        self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
                         response2.status)

        self._servicer.set(_WATCH_SERVICE,
                           health_pb2.HealthCheckResponse.SERVING)
        response1 = response_queue1.get(timeout=test_constants.SHORT_TIMEOUT)
        response2 = response_queue2.get(timeout=test_constants.SHORT_TIMEOUT)
        self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
                         response1.status)
        self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
                         response2.status)

        rendezvous1.cancel()
        rendezvous2.cancel()
        thread1.join()
        thread2.join()
        self.assertTrue(response_queue1.empty())
        self.assertTrue(response_queue2.empty())

    def test_cancelled_watch_removed_from_watch_list(self):
        request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
        response_queue = queue.Queue()
        rendezvous = self._stub.Watch(request)
        thread = threading.Thread(
            target=_consume_responses, args=(rendezvous, response_queue))
        thread.start()

        response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
        self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
                         response.status)

        rendezvous.cancel()
        self._servicer.set(_WATCH_SERVICE,
                           health_pb2.HealthCheckResponse.SERVING)
        thread.join()

        # Wait, if necessary, for serving thread to process client cancellation
        timeout = time.time() + test_constants.SHORT_TIMEOUT
        while time.time() < timeout and self._servicer._watchers[_WATCH_SERVICE]:
            time.sleep(1)
        self.assertFalse(self._servicer._watchers[_WATCH_SERVICE],
                         'watch set should be empty')
        self.assertTrue(response_queue.empty())

    def test_health_service_name(self):
        self.assertEqual(health.SERVICE_NAME, 'grpc.health.v1.Health')


if __name__ == '__main__':
    unittest.main(verbosity=2)
